package com.atuguigu.flink.Day08;

import com.atuguigu.flink.Day04.Example4;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.sql.Timestamp;

import static org.apache.flink.table.api.Expressions.$;

// sql实现实时topN
public class Example6 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<Example4.UserBehavior> stream = env
                .readTextFile("E:\\data\\UserBehavior.csv")
                .map(new MapFunction<String, Example4.UserBehavior>() {
                    @Override
                    public Example4.UserBehavior map(String value) throws Exception {
                        String[] arr = value.split(",");
                        return new Example4.UserBehavior(arr[0], arr[1], arr[2], arr[3], Long.parseLong(arr[4]) * 1000L);
                    }
                })
                .filter(r -> r.behaviortype.equals("pv"))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Example4.UserBehavior>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Example4.UserBehavior>() {
                                    @Override
                                    public long extractTimestamp(Example4.UserBehavior element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );


        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        //SQL
        tEnv.createTemporaryView(
                "userbehavior",
                stream,
                $("itemId"),
                $("timestamp").rowtime().as("ts")
        );

        String innerSQL="SELECT itemId,COUNT(itemId)as itemCount HOP_END(ts,INTERVAL '5' MINUTE,INTERVAL '1' HOUR)" +
                " FROM userbehavior GROUP BY idemId,HOP(ts,INTERVAL '5' MINUTE,INTERVAL '1' HOUR)";
        String midSQL="SELECT *, ROW_NUMBER() OVER (PARTITION BY windowEnd ORDER BY itemCount DESC) as row_num" +
                " FROM (" + innerSQL + ")";

        // 取出前三名
        String outerSQL = "SELECT * FROM (" + midSQL + ") WHERE row_num <= 3";

        System.out.println(outerSQL);

        Table result = tEnv.sqlQuery(outerSQL);

        tEnv.toRetractStream(result, Row.class).filter(r -> r.f0.equals(true)).print();




        env.execute();
    }


    public static class UserBehavior {
        public String userId;
        public String itemId;
        public String categoryId;
        public String behaviorType;
        public Long timestamp;

        public UserBehavior() {
        }

        public UserBehavior(String userId, String itemId, String categoryId, String behaviorType, Long timestamp) {
            this.userId = userId;
            this.itemId = itemId;
            this.categoryId = categoryId;
            this.behaviorType = behaviorType;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "UserBehavior{" +
                    "userId='" + userId + '\'' +
                    ", itemId='" + itemId + '\'' +
                    ", categoryId='" + categoryId + '\'' +
                    ", behaviorType='" + behaviorType + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
}
